summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp79
1 files changed, 52 insertions, 27 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 78f7bf13fc..467c960674 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -144,6 +144,13 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
+/** NOTE: increment this number whenever any incompatible changes in
+ * cluster protocol/behavior are made. It allows early detection and
+ * sensible reporting of an attempt to mix different versions in a
+ * cluster.
+ */
+const uint32_t Cluster::CLUSTER_VERSION = 1;
+
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
MemberId member;
@@ -153,7 +160,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
- void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); }
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
@@ -233,6 +240,7 @@ void Cluster::initialize() {
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(debug, *this << " add local connection " << c->getId());
localConnections.insert(c);
assert(c->getId().getMember() == self);
// Announce the connection to the cluster.
@@ -242,11 +250,14 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(debug, *this << " add shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, either
// discarding or stalled, so deliveredFrame is not processing any
// connection events.
assert(discarding);
- connections.insert(ConnectionMap::value_type(c->getId(), c));
+ pair<ConnectionMap::iterator, bool> ib
+ = connections.insert(ConnectionMap::value_type(c->getId(), c));
+ assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
@@ -317,11 +328,11 @@ void Cluster::deliver(
}
LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
-LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
+ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
-void Cluster::deliverEvent(const Event& e) {
+ void Cluster::deliverEvent(const Event& e) {
LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
- deliverEventQueue.push(e);
+ deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
@@ -339,16 +350,21 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
- QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isCluster()) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- if (castUpdateOffer(ef.frame.getBody()))
+ const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
+ if (offer) {
+ QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId()
+ << " to " << MemberId(offer->getUpdatee()));
deliverEventQueue.stop();
+ }
deliverFrame(ef);
}
else if(!discarding) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isControl())
deliverFrame(EventFrame(e, e.getFrame()));
else {
@@ -403,9 +419,8 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
}
- else {
+ else
processFrame(e, l);
- }
}
@@ -447,7 +462,7 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
mgmtId << id;
cp = new Connection(*this, shadowOut, mgmtId.str(), id);
}
- connections.insert(ConnectionMap::value_type(id, cp));
+ connections.insert(ConnectionMap::value_type(id, cp));
}
return cp;
}
@@ -556,7 +571,8 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
+ mcast.mcastControl(
+ ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self);
}
}
@@ -587,26 +603,29 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
-// Go back to normal processing after an offer that did not result in an update.
-void Cluster::cancelOffer(const MemberId& updatee, Lock& l) {
- QPID_LOG(info, *this << " cancelled offer to " << updatee);
- deliverEventQueue.start(); // Go back to normal processing
- setReady(l);
- makeOffer(map.firstJoiner(), l); // Maybe make another offer.
-}
-
-void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid,
+ uint32_t version, Lock& l) {
// NOTE: deliverEventQueue has been stopped at the update offer by
// deliveredEvent in case an update is required.
if (state == LEFT) return;
+ if (version != CLUSTER_VERSION) {
+ QPID_LOG(critical, *this << " incompatible cluster versions " <<
+ version << " != " << CLUSTER_VERSION);
+ leave(l);
+ return;
+ }
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
assert(state == OFFER);
if (url) // My offer was first.
updateStart(updatee, *url, l);
- else // Another offer was first.
- cancelOffer(updatee, l);
+ else { // Another offer was first.
+ QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall");
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ deliverEventQueue.start(); // Go back to normal processing
+ }
}
else if (updatee == self && url) {
assert(state == JOINER);
@@ -615,8 +634,11 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
QPID_LOG(info, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
- else
+ else {
+ QPID_LOG(debug,*this << " unstall, ignore update " << updater
+ << " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
+ }
}
static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
@@ -629,21 +651,23 @@ static client::ConnectionSettings connectionSettings(const ClusterSettings& sett
void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
// An offer was received while handling an error, and converted to a retract.
+ // Behavior is very similar to updateOffer.
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
- QPID_LOG(info, *this << " retracted offer to " << updatee);
+ QPID_LOG(info, *this << " retracting offer to " << updatee);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
}
- cancelOffer(updatee, l);
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ // Don't unstall the event queue, that was already done in deliveredFrame
}
- else
- deliverEventQueue.start(); // Not involved in update.
+ QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee);
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
@@ -672,6 +696,7 @@ void Cluster::updateInDone(const ClusterMap& m) {
void Cluster::updateInRetracted() {
Lock l(lock);
updateRetracted = true;
+ map.clearStatus();
checkUpdateIn(l);
}