diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-16 17:07:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-16 17:07:26 +0000 |
| commit | d39a165c9c8d1fa2fd728a2237117efa71848874 (patch) | |
| tree | dd07b81f1f2d2de42ce2fdf28432130566a5622e /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | f7a4f7bcf77726767d0905f56f5c44c7a34d82a3 (diff) | |
| download | qpid-python-d39a165c9c8d1fa2fd728a2237117efa71848874.tar.gz | |
Fix race in cluster causing incorrect known-broker lists to be sent to clients.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705287 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 214 |
1 files changed, 115 insertions, 99 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 811c1c9557..a0bcb9ae02 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterDumpOfferBody.h" #include "qpid/framing/ClusterDumpStartBody.h" #include "qpid/framing/ClusterShutdownBody.h" @@ -76,6 +77,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } + void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); } void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); } void shutdown() { cluster.shutdown(member, l); } @@ -89,14 +91,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg(*this), name(name_), myUrl(url_), - memberId(cpg.self()), + myId(cpg.self()), cpgDispatchHandle( cpg, boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(boost::bind(&Cluster::process, this, _1), poller), + deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), mcastId(0), mgmtObject(0), state(INIT), @@ -115,20 +117,20 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : failoverExchange.reset(new FailoverExchange(this)); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); + deliverQueue.start(); cpg.join(name); - QPID_LOG(notice, *this << " joining cluster " << name.str()); + QPID_LOG(notice, *this << " will join cluster " << name.str()); } Cluster::~Cluster() { if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. } -void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { +bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Lock l(lock); - // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map? - // esp shadow connections? See race comment in getConnection. - assert(!c->isCatchUp()); - connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); + bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second; + assert(result); + return result; } void Cluster::erase(ConnectionId id) { @@ -136,14 +138,19 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { +void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) { Lock l(lock); - mcastControl(body, cptr, l); + mcastControl(body, id, seq, l); } -void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, Lock&) { - Lock l(lock); - Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId)); +void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq, Lock& l) { + Event e(Event::control(body, id, seq)); + QPID_LOG(trace, *this << " MCAST " << e << ": " << body); + mcast(e, l); +} + +void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) { + Event e(Event::control(body, ConnectionId(myId,0), ++mcastId)); QPID_LOG(trace, *this << " MCAST " << e << ": " << body); mcast(e, l); } @@ -166,8 +173,8 @@ void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); } void Cluster::mcast(const Event& e, Lock&) { if (state == LEFT) return; - if (state < READY && e.isConnection()) { - // Stall outgoing connection events. + if (state <= CATCHUP && e.isConnection()) { + // Stall outgoing connection events untill we are fully READY QPID_LOG(trace, *this << " MCAST deferred: " << e ); mcastQueue.push_back(e); } @@ -192,10 +199,10 @@ void Cluster::leave() { void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; + if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); QPID_LOG(notice, *this << " leaving cluster " << name.str()); if (!deliverQueue.isStopped()) deliverQueue.stop(); - if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); try { cpg.leave(name); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); @@ -211,14 +218,15 @@ void Cluster::leave(Lock&) { boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { ConnectionMap::iterator i = connections.find(connectionId); if (i == connections.end()) { - if (connectionId.getMember() == memberId) { // Closed local connection + if (connectionId.getMember() == myId) { // Closed local connection QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId); return boost::intrusive_ptr<Connection>(); } else { // New shadow connection std::ostringstream mgmtId; mgmtId << name.str() << ":" << connectionId; - ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); + ConnectionMap::value_type value(connectionId, + new Connection(*this, shadowOut, mgmtId.str(), connectionId)); i = connections.insert(value).first; } } @@ -242,50 +250,54 @@ void Cluster::deliver( { Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); - Event e = Event::delivered(from, msg, msg_len); + deliver(Event::delivered(from, msg, msg_len), l); +} + +void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; - QPID_LOG(trace, *this << " DLVR: " << e); - if (e.isCluster() && state != DUMPEE) // Process cluster controls immediately unless in DUMPEE state. - process(e, l); - else if (state != NEWBIE) // Newbie discards events up to the dump offer. - deliverQueue.push(e); + QPID_LOG(trace, *this << " PUSH: " << e); + deliverQueue.push(e); // Otherwise enqueue for processing. } -void Cluster::process(const Event& e) { +void Cluster::delivered(const Event& e) { Lock l(lock); - process(e,l); + delivered(e,l); } -void Cluster::process(const Event& e, Lock& l) { +void Cluster::delivered(const Event& e, Lock& l) { try { Buffer buf(e); AMQFrame frame; if (e.isCluster()) { while (frame.decode(buf)) { - QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); ClusterDispatcher dispatch(*this, e.getMemberId(), l); if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } } else { // e.isConnection() - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); - if (connection) { // Ignore if no connection. - if (e.getType() == DATA) { - QPID_LOG(trace, *this << " PROC: " << e); - connection->deliverBuffer(buf); - } - else { // control + if (state == NEWBIE) { + QPID_LOG(trace, *this << " DROP: " << e); + } + else { + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (!connection) return; + if (e.getType() == CONTROL) { while (frame.decode(buf)) { - QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); connection->delivered(frame); } } + else { + QPID_LOG(trace, *this << " DLVR: " << e); + connection->deliverBuffer(buf); + } } } } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster process: " << e.what()); + QPID_LOG(critical, *this << " error in cluster delivered: " << e.what()); leave(l); } } @@ -304,11 +316,11 @@ ostream& operator<<(ostream& o, const AddrList& a) { for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " joined "; break; - case CPG_REASON_LEAVE: reasonString = " left "; break; - case CPG_REASON_NODEDOWN: reasonString = " node-down "; break; - case CPG_REASON_NODEUP: reasonString = " node-up "; break; - case CPG_REASON_PROCDOWN: reasonString = " process-down "; break; + case CPG_REASON_JOIN: reasonString = " (joined) "; break; + case CPG_REASON_LEAVE: reasonString = " (left) "; break; + case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break; + case CPG_REASON_NODEUP: reasonString = " (node-up) "; break; + case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); @@ -338,61 +350,52 @@ void Cluster::configChange ( cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) + QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - map.configChange(current, nCurrent, left, nLeft, joined, nJoined); + std::string addresses; + for (cpg_address* p = current; p < current+nCurrent; ++p) + addresses.append(MemberId(*p).str()); + deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l); +} + +void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { + bool memberChange = map.configChange(addresses); if (state == LEFT) return; - if (!map.isAlive(memberId)) { leave(l); return; } - if(state == INIT) { // First configChange - if (map.aliveCount() == 1) { + if (!map.isAlive(myId)) { // Final config change. + leave(l); + return; + } + + if (state == INIT) { // First configChange + if (map.aliveCount() == 1) { QPID_LOG(info, *this << " first in cluster at " << myUrl); - map = ClusterMap(memberId, myUrl, true); + state = READY; + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + map = ClusterMap(myId, myUrl, true); memberUpdate(l); - unstall(l); } else { // Joining established group. state = NEWBIE; - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), 0, l); + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l); QPID_LOG(debug, *this << " send dump-request " << myUrl); } } - else if (state >= READY) + else if (state >= READY && memberChange) memberUpdate(l); } -void Cluster::dumpInDone(const ClusterMap& m) { - Lock l(lock); - dumpedMap = m; - checkDumpIn(l); -} + + void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { if (state == READY && map.isNewbie(id)) { state = OFFER; QPID_LOG(debug, *this << " send dump-offer to " << id); - mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l); - } -} - -void Cluster::unstall(Lock& l) { - // Called with lock held - switch (state) { - case INIT: case DUMPEE: case DUMPER: case READY: - QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size() - << " mcast=" << mcastQueue.size()); - deliverQueue.start(); - state = READY; - for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); - mcastQueue.clear(); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - break; - case LEFT: break; - case NEWBIE: case OFFER: - assert(0); + mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), l); } } @@ -418,23 +421,25 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { - map.ready(id, Url(url)); - if (id == memberId) - unstall(l); - memberUpdate(l); + if (map.ready(id, Url(url))) + memberUpdate(l); + if (state == CATCHUP && id == myId) { + QPID_LOG(debug, *this << " caught-up, going to ready mode."); + state = READY; + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); + mcastQueue.clear(); + } } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { if (state == LEFT) return; MemberId dumpee(dumpeeInt); boost::optional<Url> url = map.dumpOffer(dumper, dumpee); - if (dumper == memberId) { + if (dumper == myId) { assert(state == OFFER); if (url) { // My offer was first. - QPID_LOG(debug, *this << " mark dump point for dump to " << dumpee); - // Put dump-start on my own deliver queue to mark the stall point. - // We will stall when it is processed. - deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), dumpee, url->str()), memberId)); + dumpStart(myId, dumpee, url->str(), l); } else { // Another offer was first. QPID_LOG(debug, *this << " cancel dump offer to " << dumpee); @@ -442,38 +447,47 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. } } - else if (dumpee == memberId && url) { + else if (dumpee == myId && url) { assert(state == NEWBIE); QPID_LOG(debug, *this << " accepted dump-offer from " << dumper); state = DUMPEE; + deliverQueue.stop(); checkDumpIn(l); } } +// FIXME aconway 2008-10-15: no longer need a separate control now +// that the dump control is in the deliver queue. void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) { if (state == LEFT) return; MemberId dumpee(dumpeeInt); Url url(urlStr); assert(state == OFFER); + state = DUMPER; deliverQueue.stop(); QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr); - state = DUMPER; if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( - new DumpClient(memberId, dumpee, url, broker, map, getConnections(l), + new DumpClient(myId, dumpee, url, broker, map, getConnections(l), boost::bind(&Cluster::dumpOutDone, this), boost::bind(&Cluster::dumpOutError, this, _1))); } +void Cluster::dumpInDone(const ClusterMap& m) { + Lock l(lock); + dumpedMap = m; + checkDumpIn(l); +} + void Cluster::checkDumpIn(Lock& l) { if (state == LEFT) return; - assert(state == DUMPEE || state == NEWBIE); if (state == DUMPEE && dumpedMap) { map = *dumpedMap; - QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); - mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); - state = READY; - // unstall when ready control is self-delivered. + QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map); + mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l); + // Don't flush the mcast queue till we are READY, on self-deliver. + state = CATCHUP; + deliverQueue.start(); } } @@ -485,7 +499,8 @@ void Cluster::dumpOutDone() { void Cluster::dumpOutDone(Lock& l) { QPID_LOG(debug, *this << " finished sending dump."); assert(state == DUMPER); - unstall(l); + state = READY; + deliverQueue.start(); tryMakeOffer(map.firstNewbie(), l); // Try another offer } @@ -504,7 +519,7 @@ ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { Lock l(lock); - QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]"); + QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; @@ -520,10 +535,11 @@ void Cluster::stopClusterNode(Lock&) { void Cluster::stopFullCluster(Lock& l) { QPID_LOG(notice, *this << " shutting down cluster " << name.str()); - mcastControl(ClusterShutdownBody(), 0, l); + mcastControl(ClusterShutdownBody(), l); } void Cluster::memberUpdate(Lock& l) { + QPID_LOG(debug, *this << " member update, map=" << map); std::vector<Url> vectUrl = getUrls(l); size_t size = vectUrl.size(); @@ -552,12 +568,12 @@ void Cluster::memberUpdate(Lock& l) { } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", "OFFER", "DUMPER", "LEFT" }; - return o << cluster.memberId << "(" << STATE[cluster.state] << ")"; + static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" }; + return o << cluster.myId << "(" << STATE[cluster.state] << ")"; } MemberId Cluster::getId() const { - return memberId; // Immutable, no need to lock. + return myId; // Immutable, no need to lock. } broker::Broker& Cluster::getBroker() const { |
