diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 79 |
1 files changed, 52 insertions, 27 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 78f7bf13fc..467c960674 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -144,6 +144,13 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; +/** NOTE: increment this number whenever any incompatible changes in + * cluster protocol/behavior are made. It allows early detection and + * sensible reporting of an attempt to mix different versions in a + * cluster. + */ +const uint32_t Cluster::CLUSTER_VERSION = 1; + struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; MemberId member; @@ -153,7 +160,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } - void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } @@ -233,6 +240,7 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { + QPID_LOG(debug, *this << " add local connection " << c->getId()); localConnections.insert(c); assert(c->getId().getMember() == self); // Announce the connection to the cluster. @@ -242,11 +250,14 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { + QPID_LOG(debug, *this << " add shadow connection " << c->getId()); // Safe to use connections here because we're pre-catchup, either // discarding or stalled, so deliveredFrame is not processing any // connection events. assert(discarding); - connections.insert(ConnectionMap::value_type(c->getId(), c)); + pair<ConnectionMap::iterator, bool> ib + = connections.insert(ConnectionMap::value_type(c->getId(), c)); + assert(ib.second); } void Cluster::erase(const ConnectionId& id) { @@ -317,11 +328,11 @@ void Cluster::deliver( } LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) -LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) + LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) -void Cluster::deliverEvent(const Event& e) { + void Cluster::deliverEvent(const Event& e) { LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());) - deliverEventQueue.push(e); + deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { @@ -339,16 +350,21 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData())); - QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { + QPID_LOG(trace, *this << " DLVR: " << e); EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - if (castUpdateOffer(ef.frame.getBody())) + const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); + if (offer) { + QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId() + << " to " << MemberId(offer->getUpdatee())); deliverEventQueue.stop(); + } deliverFrame(ef); } else if(!discarding) { + QPID_LOG(trace, *this << " DLVR: " << e); if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { @@ -403,9 +419,8 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } - else { + else processFrame(e, l); - } } @@ -447,7 +462,7 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { mgmtId << id; cp = new Connection(*this, shadowOut, mgmtId.str(), id); } - connections.insert(ConnectionMap::value_type(id, cp)); + connections.insert(ConnectionMap::value_type(id, cp)); } return cp; } @@ -556,7 +571,8 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self); + mcast.mcastControl( + ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self); } } @@ -587,26 +603,29 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -// Go back to normal processing after an offer that did not result in an update. -void Cluster::cancelOffer(const MemberId& updatee, Lock& l) { - QPID_LOG(info, *this << " cancelled offer to " << updatee); - deliverEventQueue.start(); // Go back to normal processing - setReady(l); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. -} - -void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { +void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, + uint32_t version, Lock& l) { // NOTE: deliverEventQueue has been stopped at the update offer by // deliveredEvent in case an update is required. if (state == LEFT) return; + if (version != CLUSTER_VERSION) { + QPID_LOG(critical, *this << " incompatible cluster versions " << + version << " != " << CLUSTER_VERSION); + leave(l); + return; + } MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); if (url) // My offer was first. updateStart(updatee, *url, l); - else // Another offer was first. - cancelOffer(updatee, l); + else { // Another offer was first. + QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall"); + setReady(l); + makeOffer(map.firstJoiner(), l); // Maybe make another offer. + deliverEventQueue.start(); // Go back to normal processing + } } else if (updatee == self && url) { assert(state == JOINER); @@ -615,8 +634,11 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu QPID_LOG(info, *this << " receiving update from " << updater); checkUpdateIn(l); } - else + else { + QPID_LOG(debug,*this << " unstall, ignore update " << updater + << " to " << updatee); deliverEventQueue.start(); // Not involved in update. + } } static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { @@ -629,21 +651,23 @@ static client::ConnectionSettings connectionSettings(const ClusterSettings& sett void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { // An offer was received while handling an error, and converted to a retract. + // Behavior is very similar to updateOffer. if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); if (url) { // My offer was first. - QPID_LOG(info, *this << " retracted offer to " << updatee); + QPID_LOG(info, *this << " retracting offer to " << updatee); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); } - cancelOffer(updatee, l); + setReady(l); + makeOffer(map.firstJoiner(), l); // Maybe make another offer. + // Don't unstall the event queue, that was already done in deliveredFrame } - else - deliverEventQueue.start(); // Not involved in update. + QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee); } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { @@ -672,6 +696,7 @@ void Cluster::updateInDone(const ClusterMap& m) { void Cluster::updateInRetracted() { Lock l(lock); updateRetracted = true; + map.clearStatus(); checkUpdateIn(l); } |
