summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-16 17:07:26 +0000
committerAlan Conway <aconway@apache.org>2008-10-16 17:07:26 +0000
commitd39a165c9c8d1fa2fd728a2237117efa71848874 (patch)
treedd07b81f1f2d2de42ce2fdf28432130566a5622e /cpp/src/qpid/cluster/Cluster.cpp
parentf7a4f7bcf77726767d0905f56f5c44c7a34d82a3 (diff)
downloadqpid-python-d39a165c9c8d1fa2fd728a2237117efa71848874.tar.gz
Fix race in cluster causing incorrect known-broker lists to be sent to clients.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705287 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp214
1 files changed, 115 insertions, 99 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 811c1c9557..a0bcb9ae02 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterDumpOfferBody.h"
#include "qpid/framing/ClusterDumpStartBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
@@ -76,6 +77,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
+ void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); }
void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); }
void shutdown() { cluster.shutdown(member, l); }
@@ -89,14 +91,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpg(*this),
name(name_),
myUrl(url_),
- memberId(cpg.self()),
+ myId(cpg.self()),
cpgDispatchHandle(
cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(boost::bind(&Cluster::process, this, _1), poller),
+ deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
mcastId(0),
mgmtObject(0),
state(INIT),
@@ -115,20 +117,20 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
failoverExchange.reset(new FailoverExchange(this));
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
+ deliverQueue.start();
cpg.join(name);
- QPID_LOG(notice, *this << " joining cluster " << name.str());
+ QPID_LOG(notice, *this << " will join cluster " << name.str());
}
Cluster::~Cluster() {
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Lock l(lock);
- // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map?
- // esp shadow connections? See race comment in getConnection.
- assert(!c->isCatchUp());
- connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+ bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second;
+ assert(result);
+ return result;
}
void Cluster::erase(ConnectionId id) {
@@ -136,14 +138,19 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) {
Lock l(lock);
- mcastControl(body, cptr, l);
+ mcastControl(body, id, seq, l);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, Lock&) {
- Lock l(lock);
- Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId));
+void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq, Lock& l) {
+ Event e(Event::control(body, id, seq));
+ QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
+ mcast(e, l);
+}
+
+void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) {
+ Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
mcast(e, l);
}
@@ -166,8 +173,8 @@ void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
void Cluster::mcast(const Event& e, Lock&) {
if (state == LEFT)
return;
- if (state < READY && e.isConnection()) {
- // Stall outgoing connection events.
+ if (state <= CATCHUP && e.isConnection()) {
+ // Stall outgoing connection events untill we are fully READY
QPID_LOG(trace, *this << " MCAST deferred: " << e );
mcastQueue.push_back(e);
}
@@ -192,10 +199,10 @@ void Cluster::leave() {
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
+ if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
QPID_LOG(notice, *this << " leaving cluster " << name.str());
if (!deliverQueue.isStopped()) deliverQueue.stop();
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
try { cpg.leave(name); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
@@ -211,14 +218,15 @@ void Cluster::leave(Lock&) {
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) {
ConnectionMap::iterator i = connections.find(connectionId);
if (i == connections.end()) {
- if (connectionId.getMember() == memberId) { // Closed local connection
+ if (connectionId.getMember() == myId) { // Closed local connection
QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId);
return boost::intrusive_ptr<Connection>();
}
else { // New shadow connection
std::ostringstream mgmtId;
mgmtId << name.str() << ":" << connectionId;
- ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
+ ConnectionMap::value_type value(connectionId,
+ new Connection(*this, shadowOut, mgmtId.str(), connectionId));
i = connections.insert(value).first;
}
}
@@ -242,50 +250,54 @@ void Cluster::deliver(
{
Mutex::ScopedLock l(lock);
MemberId from(nodeid, pid);
- Event e = Event::delivered(from, msg, msg_len);
+ deliver(Event::delivered(from, msg, msg_len), l);
+}
+
+void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
- QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.isCluster() && state != DUMPEE) // Process cluster controls immediately unless in DUMPEE state.
- process(e, l);
- else if (state != NEWBIE) // Newbie discards events up to the dump offer.
- deliverQueue.push(e);
+ QPID_LOG(trace, *this << " PUSH: " << e);
+ deliverQueue.push(e); // Otherwise enqueue for processing.
}
-void Cluster::process(const Event& e) {
+void Cluster::delivered(const Event& e) {
Lock l(lock);
- process(e,l);
+ delivered(e,l);
}
-void Cluster::process(const Event& e, Lock& l) {
+void Cluster::delivered(const Event& e, Lock& l) {
try {
Buffer buf(e);
AMQFrame frame;
if (e.isCluster()) {
while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
ClusterDispatcher dispatch(*this, e.getMemberId(), l);
if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
else { // e.isConnection()
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
- if (connection) { // Ignore if no connection.
- if (e.getType() == DATA) {
- QPID_LOG(trace, *this << " PROC: " << e);
- connection->deliverBuffer(buf);
- }
- else { // control
+ if (state == NEWBIE) {
+ QPID_LOG(trace, *this << " DROP: " << e);
+ }
+ else {
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (!connection) return;
+ if (e.getType() == CONTROL) {
while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
connection->delivered(frame);
}
}
+ else {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ connection->deliverBuffer(buf);
+ }
}
}
}
catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster process: " << e.what());
+ QPID_LOG(critical, *this << " error in cluster delivered: " << e.what());
leave(l);
}
}
@@ -304,11 +316,11 @@ ostream& operator<<(ostream& o, const AddrList& a) {
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " joined "; break;
- case CPG_REASON_LEAVE: reasonString = " left "; break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down "; break;
- case CPG_REASON_NODEUP: reasonString = " node-up "; break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down "; break;
+ case CPG_REASON_JOIN: reasonString = " (joined) "; break;
+ case CPG_REASON_LEAVE: reasonString = " (left) "; break;
+ case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break;
+ case CPG_REASON_NODEUP: reasonString = " (node-up) "; break;
+ case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
@@ -338,61 +350,52 @@ void Cluster::configChange (
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int /*nJoined*/)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
+ QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
- map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+ std::string addresses;
+ for (cpg_address* p = current; p < current+nCurrent; ++p)
+ addresses.append(MemberId(*p).str());
+ deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l);
+}
+
+void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
+ bool memberChange = map.configChange(addresses);
if (state == LEFT) return;
- if (!map.isAlive(memberId)) { leave(l); return; }
- if(state == INIT) { // First configChange
- if (map.aliveCount() == 1) {
+ if (!map.isAlive(myId)) { // Final config change.
+ leave(l);
+ return;
+ }
+
+ if (state == INIT) { // First configChange
+ if (map.aliveCount() == 1) {
QPID_LOG(info, *this << " first in cluster at " << myUrl);
- map = ClusterMap(memberId, myUrl, true);
+ state = READY;
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ map = ClusterMap(myId, myUrl, true);
memberUpdate(l);
- unstall(l);
}
else { // Joining established group.
state = NEWBIE;
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), 0, l);
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
- else if (state >= READY)
+ else if (state >= READY && memberChange)
memberUpdate(l);
}
-void Cluster::dumpInDone(const ClusterMap& m) {
- Lock l(lock);
- dumpedMap = m;
- checkDumpIn(l);
-}
+
+
void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
QPID_LOG(debug, *this << " send dump-offer to " << id);
- mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l);
- }
-}
-
-void Cluster::unstall(Lock& l) {
- // Called with lock held
- switch (state) {
- case INIT: case DUMPEE: case DUMPER: case READY:
- QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size()
- << " mcast=" << mcastQueue.size());
- deliverQueue.start();
- state = READY;
- for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
- mcastQueue.clear();
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- break;
- case LEFT: break;
- case NEWBIE: case OFFER:
- assert(0);
+ mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), l);
}
}
@@ -418,23 +421,25 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
- map.ready(id, Url(url));
- if (id == memberId)
- unstall(l);
- memberUpdate(l);
+ if (map.ready(id, Url(url)))
+ memberUpdate(l);
+ if (state == CATCHUP && id == myId) {
+ QPID_LOG(debug, *this << " caught-up, going to ready mode.");
+ state = READY;
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ mcastQueue.clear();
+ }
}
void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
if (state == LEFT) return;
MemberId dumpee(dumpeeInt);
boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
- if (dumper == memberId) {
+ if (dumper == myId) {
assert(state == OFFER);
if (url) { // My offer was first.
- QPID_LOG(debug, *this << " mark dump point for dump to " << dumpee);
- // Put dump-start on my own deliver queue to mark the stall point.
- // We will stall when it is processed.
- deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), dumpee, url->str()), memberId));
+ dumpStart(myId, dumpee, url->str(), l);
}
else { // Another offer was first.
QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
@@ -442,38 +447,47 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
}
}
- else if (dumpee == memberId && url) {
+ else if (dumpee == myId && url) {
assert(state == NEWBIE);
QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
state = DUMPEE;
+ deliverQueue.stop();
checkDumpIn(l);
}
}
+// FIXME aconway 2008-10-15: no longer need a separate control now
+// that the dump control is in the deliver queue.
void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) {
if (state == LEFT) return;
MemberId dumpee(dumpeeInt);
Url url(urlStr);
assert(state == OFFER);
+ state = DUMPER;
deliverQueue.stop();
QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr);
- state = DUMPER;
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
- new DumpClient(memberId, dumpee, url, broker, map, getConnections(l),
+ new DumpClient(myId, dumpee, url, broker, map, getConnections(l),
boost::bind(&Cluster::dumpOutDone, this),
boost::bind(&Cluster::dumpOutError, this, _1)));
}
+void Cluster::dumpInDone(const ClusterMap& m) {
+ Lock l(lock);
+ dumpedMap = m;
+ checkDumpIn(l);
+}
+
void Cluster::checkDumpIn(Lock& l) {
if (state == LEFT) return;
- assert(state == DUMPEE || state == NEWBIE);
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
- mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
- state = READY;
- // unstall when ready control is self-delivered.
+ QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
+ // Don't flush the mcast queue till we are READY, on self-deliver.
+ state = CATCHUP;
+ deliverQueue.start();
}
}
@@ -485,7 +499,8 @@ void Cluster::dumpOutDone() {
void Cluster::dumpOutDone(Lock& l) {
QPID_LOG(debug, *this << " finished sending dump.");
assert(state == DUMPER);
- unstall(l);
+ state = READY;
+ deliverQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
}
@@ -504,7 +519,7 @@ ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
Lock l(lock);
- QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
+ QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
@@ -520,10 +535,11 @@ void Cluster::stopClusterNode(Lock&) {
void Cluster::stopFullCluster(Lock& l) {
QPID_LOG(notice, *this << " shutting down cluster " << name.str());
- mcastControl(ClusterShutdownBody(), 0, l);
+ mcastControl(ClusterShutdownBody(), l);
}
void Cluster::memberUpdate(Lock& l) {
+ QPID_LOG(debug, *this << " member update, map=" << map);
std::vector<Url> vectUrl = getUrls(l);
size_t size = vectUrl.size();
@@ -552,12 +568,12 @@ void Cluster::memberUpdate(Lock& l) {
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", "OFFER", "DUMPER", "LEFT" };
- return o << cluster.memberId << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" };
+ return o << cluster.myId << "(" << STATE[cluster.state] << ")";
}
MemberId Cluster::getId() const {
- return memberId; // Immutable, no need to lock.
+ return myId; // Immutable, no need to lock.
}
broker::Broker& Cluster::getBroker() const {