diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 478 |
1 files changed, 333 insertions, 145 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index b48443526c..9c503d6d13 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -18,19 +18,24 @@ #include "Cluster.h" #include "Connection.h" +#include "DumpClient.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" -#include "qpid/framing/ClusterUpdateBody.h" #include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterDumpOfferBody.h" +#include "qpid/framing/ClusterDumpStartBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Thread.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" @@ -55,156 +60,221 @@ using qpid::management::Manageable; using qpid::management::Args; namespace qmf = qmf::org::apache::qpid::cluster; +/**@file + Threading notes: + - Public functions may be called in local connection IO threads. + see .h. +*/ + +struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { + qpid::cluster::Cluster& cluster; + MemberId member; + Cluster::Lock& l; + ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} + + void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } + void ready(const std::string& url) { cluster.ready(member, url, l); } + void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); } + void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); } + void shutdown() { cluster.shutdown(member, l); } + + bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } +}; + Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(b), poller(b.getPoller()), cpg(*this), name(name_), - url(url_), - self(cpg.self()), - cpgDispatchHandle(cpg, - boost::bind(&Cluster::dispatch, this, _1), // read - 0, // write - boost::bind(&Cluster::disconnect, this, _1) // disconnect + myUrl(url_), + memberId(cpg.self()), + cpgDispatchHandle( + cpg, + boost::bind(&Cluster::dispatch, this, _1), // read + 0, // write + boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), + deliverQueue(boost::bind(&Cluster::process, this, _1), poller), + mcastId(0), mgmtObject(0), - handler(&joiningHandler), - joiningHandler(*this), - memberHandler(*this), - mcastId(), - lastSize(1) + state(INIT), + lastSize(1) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str()); + mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str()); agent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); - // FIXME aconway 2008-09-24: // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } - QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); + QPID_LOG(notice, *this << " joining cluster " << name.str()); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. +} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - Mutex::ScopedLock l(lock); + Lock l(lock); + assert(!c->isCatchUp()); connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); } -void Cluster::dumpComplete() { handler->dumpComplete(); } - void Cluster::erase(ConnectionId id) { - Mutex::ScopedLock l(lock); + Lock l(lock); connections.erase(id); } -void Cluster::leave() { - QPID_LOG(notice, self << " leaving cluster " << name.str()); - cpg.leave(name); - // Defer shut down to the final configChange when the group knows we've left. +void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { + Lock l(lock); + mcastControl(body, cptr, l); } -void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { - AMQFrame f(body); - Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId); - Buffer buf(e); - f.encode(buf); - QPID_LOG(trace, "MCAST " << e << " " << body); - mcastEvent(e); +void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, Lock&) { + Lock l(lock); + Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId)); + QPID_LOG(trace, *this << " MCAST " << e << ": " << body); + mcast(e, l); } void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) { + Lock l(lock); + mcastBuffer(data, size, connection, id, l); +} + +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id, Lock&) { + Lock l(lock); Event e(DATA, connection, size, id); memcpy(e.getData(), data, size); - QPID_LOG(trace, "MCAST " << e); - mcastEvent(e); + QPID_LOG(trace, *this << " MCAST " << e); + mcast(e, l); } -void Cluster::mcastEvent(const Event& e) { - e.mcast(name, cpg); -} +void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); } -size_t Cluster::size() const { - Mutex::ScopedLock l(lock); - return map.size(); +void Cluster::mcast(const Event& e, Lock&) { + if (state == LEFT) return; + if (state < READY && e.isConnection()) { + // Stall outgoing connection events. + QPID_LOG(trace, *this << " MCAST deferred: " << e ); + mcastQueue.push_back(e); + } + else + e.mcast(name, cpg); } std::vector<Url> Cluster::getUrls() const { - Mutex::ScopedLock l(lock); + Lock l(lock); + return getUrls(l); +} + +std::vector<Url> Cluster::getUrls(Lock&) const { return map.memberUrls(); } -// FIXME aconway 2008-09-15: volatile for locked/unlocked functions. -// Check locking from Handler functions. -boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { - Mutex::ScopedLock l(lock); - if (id.getMember() == self) - return boost::intrusive_ptr<Connection>(id.getConnectionPtr()); - ConnectionMap::iterator i = connections.find(id); +void Cluster::leave() { + Lock l(lock); + leave(l); +} + +void Cluster::leave(Lock&) { + if (state != LEFT) { + state = LEFT; + QPID_LOG(notice, *this << " leaving cluster " << name.str()); + + if (!deliverQueue.isStopped()) deliverQueue.stop(); + if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); + try { cpg.leave(name); } + catch (const std::exception& e) { + QPID_LOG(critical, *this << " error leaving process group: " << e.what()); + } + try { broker.shutdown(); } + catch (const std::exception& e) { + QPID_LOG(critical, *this << " error during shutdown, aborting: " << e.what()); + abort(); // Big trouble. + } + } +} + +boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { + if (connectionId.getMember() == memberId) + return boost::intrusive_ptr<Connection>(connectionId.getPointer()); + ConnectionMap::iterator i = connections.find(connectionId); if (i == connections.end()) { // New shadow connection. - assert(id.getMember() != self); + assert(connectionId.getMember() != memberId); std::ostringstream mgmtId; - mgmtId << name.str() << ":" << id; - ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id)); + mgmtId << name.str() << ":" << connectionId; + ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); i = connections.insert(value).first; } return i->second; } -void Cluster::deliver( +Cluster::Connections Cluster::getConnections(Lock&) { + Connections result(connections.size()); + std::transform(connections.begin(), connections.end(), result.begin(), + boost::bind(&ConnectionMap::value_type::second, _1)); + return result; +} + +void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, uint32_t nodeid, uint32_t pid, void* msg, - int msg_len) + int msg_len) { - try { - MemberId from(nodeid, pid); - Event e = Event::delivered(from, msg, msg_len); + Mutex::ScopedLock l(lock); + MemberId from(nodeid, pid); + Event e = Event::delivered(from, msg, msg_len); + if (state == LEFT) return; + QPID_LOG(trace, *this << " DLVR: " << e); + if (e.isCluster() && state != DUMPEE) // Process cluster controls immediately unless in DUMPEE state. + process(e, l); + else if (state != NEWBIE) // Newbie discards events up to the dump offer. + deliverQueue.push(e); +} + +void Cluster::process(const Event& e) { + Lock l(lock); + process(e,l); +} - // Process cluster controls immediately - if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control - Buffer buf(e); - AMQFrame frame; +void Cluster::process(const Event& e, Lock& l) { + try { + Buffer buf(e); + AMQFrame frame; + if (e.isCluster()) { while (frame.decode(buf)) { - QPID_LOG(trace, "DLVR " << e << " " << frame); - if (!handler->invoke(e.getConnectionId().getMember(), frame)) + QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + ClusterDispatcher dispatch(*this, e.getMemberId(), l); + if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } } - else { - QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e); - handler->deliver(e); + else { // e.isConnection() + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (e.getType() == DATA) { + QPID_LOG(trace, *this << " PROC: " << e); + connection->deliverBuffer(buf); + } + else { // control + while (frame.decode(buf)) { + QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + connection->delivered(frame); + } + } } } catch (const std::exception& e) { - QPID_LOG(critical, "Error in cluster deliver: " << e.what()); - leave(); - } -} - -void Cluster::connectionEvent(const Event& e) { - Buffer buf(e); - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); - assert(connection); - if (e.getType() == DATA) { - QPID_LOG(trace, "EXEC: " << e); - connection->deliverBuffer(buf); - } - else { // control - AMQFrame frame; - while (frame.decode(buf)) { - QPID_LOG(trace, "EXEC " << e << " " << frame); - connection->delivered(frame); - } + QPID_LOG(critical, *this << " error in cluster process: " << e.what()); + leave(l); } } @@ -236,16 +306,22 @@ ostream& operator<<(ostream& o, const AddrList& a) { } void Cluster::dispatch(sys::DispatchHandle& h) { - cpg.dispatchAll(); - h.rewatch(); + try { + cpg.dispatchAll(); + h.rewatch(); + } + catch (const std::exception& e) { + QPID_LOG(critical, *this << " error in cluster deliver: " << e.what()); + leave(); + } } void Cluster::disconnect(sys::DispatchHandle& ) { - QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down"); + QPID_LOG(critical, *this << " disconnected from cluster, shutting down"); broker.shutdown(); } -void Cluster::configChange( +void Cluster::configChange ( cpg_handle_t /*handle*/, cpg_name */*group*/, cpg_address *current, int nCurrent, @@ -253,49 +329,57 @@ void Cluster::configChange( cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent) + QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - - if (find(left, left+nLeft, self) != left+nLeft) { - // I have left the group, this is the final config change. - QPID_LOG(notice, self << " left cluster " << name.str()); - broker.shutdown(); - return; + map.configChange(current, nCurrent, left, nLeft, joined, nJoined); + updateMemberStats(l); + if (state == LEFT) return; + if (!map.isAlive(memberId)) { leave(l); return; } + + if(state == INIT) { // First configChange + if (map.aliveCount() == 1) { + QPID_LOG(info, *this << " first in cluster at " << myUrl); + map = ClusterMap(memberId, myUrl, true); + unstall(l); + } + else { // Joining established group. + state = NEWBIE; + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), 0, l); + QPID_LOG(debug, *this << " send dump-request " << myUrl); + } } - - if (map.left(left, nLeft)) updateMemberStats(); - handler->configChange(current, nCurrent, left, nLeft, joined, nJoined); } - -broker::Broker& Cluster::getBroker(){ return broker; } - -void Cluster::stall() { - Mutex::ScopedLock l(lock); - QPID_LOG(debug, self << " stalling."); - // Stop processing connection events. We still process config changes - // and cluster controls in deliver() - connectionEventQueue.stop(); - if (mgmtObject!=0) mgmtObject->set_status("STALLED"); - - // FIXME aconway 2008-09-11: Flow control, we should slow down or - // stop reading from local connections while stalled to avoid an - // unbounded queue. +void Cluster::dumpInDone(const ClusterMap& m) { + Lock l(lock); + dumpedMap = m; + checkDumpIn(l); } -void Cluster::ready() { - // Called with lock held - QPID_LOG(debug, self << " ready at " << url); - unstall(); - mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); +void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { + if (state == READY && map.isNewbie(id)) { + state = OFFER; + QPID_LOG(debug, *this << " send dump-offer to " << id); + mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l); + } } -void Cluster::unstall() { +void Cluster::unstall(Lock& l) { // Called with lock held - QPID_LOG(debug, self << " un-stalling"); - handler = &memberHandler; // Member mode. - connectionEventQueue.start(poller); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + switch (state) { + case INIT: case DUMPEE: case DUMPER: + QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size() + << " mcast=" << mcastQueue.size()); + deliverQueue.start(); + state = READY; + for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); + mcastQueue.clear(); + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + break; + case LEFT: break; + case NEWBIE: case READY: case OFFER: + assert(0); + } } // Called from Broker::~Broker when broker is shut down. At this @@ -303,17 +387,106 @@ void Cluster::unstall() { // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. // -void Cluster::brokerShutdown() { - QPID_LOG(notice, self << " shutting down."); - try { cpg.shutdown(); } - catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } +void Cluster::brokerShutdown() { + QPID_LOG(notice, *this << " shutting down "); + if (state != LEFT) { + try { cpg.shutdown(); } + catch (const std::exception& e) { + QPID_LOG(error, *this << " during shutdown: " << e.what()); + } + } delete this; } -ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; } +void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { + map.dumpRequest(id, url); + tryMakeOffer(id, l); +} + +void Cluster::ready(const MemberId& id, const std::string& url, Lock&) { + map.ready(id, Url(url)); +} + +void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { + if (state == LEFT) return; + MemberId dumpee(dumpeeInt); + boost::optional<Url> url = map.dumpOffer(dumper, dumpee); + if (dumper == memberId) { + assert(state == OFFER); + if (url) { // My offer was first. + QPID_LOG(debug, *this << " mark dump point for dump to " << dumpee); + // Put dump-start on my own deliver queue to mark the stall point. + // We will stall when it is processed. + deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), dumpee, url->str()), memberId)); + } + else { // Another offer was first. + QPID_LOG(debug, *this << " cancel dump offer to " << dumpee); + state = READY; + tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. + } + } + else if (dumpee == memberId && url) { + assert(state == NEWBIE); + QPID_LOG(debug, *this << " accepted dump-offer from " << dumper); + state = DUMPEE; + checkDumpIn(l); + } +} + +void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) { + if (state == LEFT) return; + MemberId dumpee(dumpeeInt); + Url url(urlStr); + assert(state == OFFER); + deliverQueue.stop(); + QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr); + state = DUMPER; + if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. + dumpThread = Thread( + new DumpClient(memberId, dumpee, url, broker, map, getConnections(l), + boost::bind(&Cluster::dumpOutDone, this), + boost::bind(&Cluster::dumpOutError, this, _1))); +} + +void Cluster::checkDumpIn(Lock& l) { + if (state == LEFT) return; + assert(state == DUMPEE || state == NEWBIE); + if (state == DUMPEE && dumpedMap) { + map = *dumpedMap; + QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); + unstall(l); + mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); + } +} + +void Cluster::dumpOutDone() { + Monitor::ScopedLock l(lock); + dumpOutDone(l); +} + +void Cluster::dumpOutDone(Lock& l) { + QPID_LOG(debug, *this << " finished sending dump."); + assert(state == DUMPER); + unstall(l); + tryMakeOffer(map.firstNewbie(), l); // Try another offer +} + +void Cluster::dumpOutError(const std::exception& e) { + Monitor::ScopedLock l(lock); + QPID_LOG(error, *this << " error sending state dump: " << e.what()); + dumpOutDone(l); +} + +void Cluster ::shutdown(const MemberId& id, Lock& l) { + QPID_LOG(notice, *this << " received shutdown from " << id); + leave(l); +} + +ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { - QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + Lock l(lock); + QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break; case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break; @@ -322,30 +495,32 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string return Manageable::STATUS_OK; } -void Cluster::stopClusterNode(void) { - QPID_LOG(notice, self << " stopped by admin"); +void Cluster::stopClusterNode() { + QPID_LOG(notice, *this << " stopped by admin"); leave(); } -void Cluster::stopFullCluster(void) { - QPID_LOG(notice, self << " sending shutdown to cluster."); - mcastControl(ClusterShutdownBody(), 0); +void Cluster::stopFullCluster() { + Lock l(lock); + QPID_LOG(notice, *this << " shutting down cluster " << name.str()); + mcastControl(ClusterShutdownBody(), 0, l); } -void Cluster::updateMemberStats() { +void Cluster::updateMemberStats(Lock& l) { if (mgmtObject) { - if (lastSize != size() && size() ==1){ - QPID_LOG(info, "Last node standing, updating queue policies, size:" <<size()); - broker.getQueues().updateQueueClusterState(true); - lastSize = size(); - }else if (lastSize != size() && size() > 1) { - QPID_LOG(info, "Recover back from last node standing, updating queue policies, size:" <<size()); - broker.getQueues().updateQueueClusterState(false); - lastSize = size(); - } + std::vector<Url> vectUrl = getUrls(l); + size_t size = vectUrl.size(); + if (lastSize != size && size == 1){ + QPID_LOG(info, *this << " last node standing, updating queue policies."); + broker.getQueues().updateQueueClusterState(true); + } + else if (lastSize != size && size > 1) { + QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size); + broker.getQueues().updateQueueClusterState(false); + } + lastSize = size; - mgmtObject->set_clusterSize(size()); - std::vector<Url> vectUrl = getUrls(); + mgmtObject->set_clusterSize(size); string urlstr; for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { if (iter != vectUrl.begin()) urlstr += "\n"; @@ -355,4 +530,17 @@ void Cluster::updateMemberStats() { } } +std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { + static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", "OFFER", "DUMPER", "LEFT" }; + return o << cluster.memberId << "(" << STATE[cluster.state] << ")"; +} + +MemberId Cluster::getId() const { + return memberId; // Immutable, no need to lock. +} + +broker::Broker& Cluster::getBroker() const { + return broker; // Immutable, no need to lock. +} + }} // namespace qpid::cluster |
