summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-26 17:37:16 +0000
committerAlan Conway <aconway@apache.org>2008-11-26 17:37:16 +0000
commit43e26ecb7cdb04cf7a9c7e87fa7902b7ebe3f5ce (patch)
tree604fadb84ec6df689ca25b7e4ae7f2edf78f71b8 /cpp/src/qpid/cluster/Cluster.cpp
parent300063322dd80c0dee30475de494afdb6a846d6a (diff)
downloadqpid-python-43e26ecb7cdb04cf7a9c7e87fa7902b7ebe3f5ce.tar.gz
Cluster.cpp: Fixed last-node-standing logic, better logging.
Shlib.cpp: added file name to errors messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@720924 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp142
1 files changed, 72 insertions, 70 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d08e06c863..7db4e1a3b9 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -102,7 +102,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
mcastId(0),
mgmtObject(0),
state(INIT),
- lastSize(1)
+ lastSize(0),
+ lastBroker(false)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -115,7 +116,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name.str());
+ QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
@@ -198,9 +199,8 @@ void Cluster::leave() {
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
QPID_LOG(notice, *this << " leaving cluster " << name.str());
-
+ if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
if (!deliverQueue.isStopped()) deliverQueue.stop();
try { cpg.leave(name); }
catch (const std::exception& e) {
@@ -258,47 +258,48 @@ void Cluster::deliver(const Event& e, Lock&) {
deliverQueue.push(e); // Otherwise enqueue for processing.
}
+// Entry point: called when deliverQueue has events to process.
void Cluster::delivered(const Event& e) {
- Lock l(lock);
- delivered(e,l);
+ try {
+ Lock l(lock);
+ delivered(e,l);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
+ leave();
+ }
+
}
void Cluster::delivered(const Event& e, Lock& l) {
- try {
- Buffer buf(e);
- AMQFrame frame;
- if (e.isCluster()) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- ClusterDispatcher dispatch(*this, e.getMemberId(), l);
- if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
+ Buffer buf(e);
+ AMQFrame frame;
+ if (e.isCluster()) {
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ ClusterDispatcher dispatch(*this, e.getMemberId(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
}
- else { // e.isConnection()
- if (state == NEWBIE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- }
- else {
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
- if (!connection) return;
- if (e.getType() == CONTROL) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- connection->delivered(frame);
- }
- }
- else {
- QPID_LOG(trace, *this << " DLVR: " << e);
- connection->deliverBuffer(buf);
+ }
+ else { // e.isConnection()
+ if (state == NEWBIE) {
+ QPID_LOG(trace, *this << " DROP: " << e);
+ }
+ else {
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (!connection) return;
+ if (e.getType() == CONTROL) {
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ connection->delivered(frame);
}
}
+ else {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ connection->deliverBuffer(buf);
+ }
}
}
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster delivered: " << e.what());
- leave(l);
- }
}
struct AddrList {
@@ -328,23 +329,24 @@ ostream& operator<<(ostream& o, const AddrList& a) {
return o << a.suffix;
}
+// Entry point: called by IO to dispatch CPG events.
void Cluster::dispatch(sys::DispatchHandle& h) {
try {
cpg.dispatchAll();
h.rewatch();
- }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster deliver: " << e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
leave();
}
}
+// Entry point: called if disconnected from CPG.
void Cluster::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, *this << " disconnected from cluster, shutting down");
+ QPID_LOG(critical, *this << " error disconnected from cluster");
broker.shutdown();
}
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
cpg_name */*group*/,
cpg_address *current, int nCurrent,
@@ -372,16 +374,16 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
setClusterId(true);
- QPID_LOG(info, *this << " first in cluster at " << myUrl);
state = READY;
+ QPID_LOG(notice, *this << " first in cluster");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
map = ClusterMap(myId, myUrl, true);
memberUpdate(l);
}
else { // Joining established group.
state = NEWBIE;
+ QPID_LOG(info, *this << " request state dump");
mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
- QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
else if (state >= READY && memberChange)
@@ -394,7 +396,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
- QPID_LOG(debug, *this << " send dump-offer to " << id);
+ QPID_LOG(info, *this << " send dump-offer to " << id);
mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l);
}
}
@@ -424,8 +426,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
if (state == CATCHUP && id == myId) {
- QPID_LOG(debug, *this << " caught-up, going to ready mode.");
state = READY;
+ QPID_LOG(notice, *this << " caught up");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
mcastQueue.clear();
@@ -442,16 +444,16 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
dumpStart(myId, dumpee, url->str(), l);
}
else { // Another offer was first.
- QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
state = READY;
+ QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
}
}
else if (dumpee == myId && url) {
assert(state == NEWBIE);
- QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
setClusterId(uuid);
state = DUMPEE;
+ QPID_LOG(info, *this << " receiving dump from " << dumper);
deliverQueue.stop();
checkDumpIn(l);
}
@@ -465,8 +467,8 @@ void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string&
Url url(urlStr);
assert(state == OFFER);
state = DUMPER;
+ QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr);
deliverQueue.stop();
- QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr);
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
new DumpClient(myId, dumpee, url, broker, map, getConnections(l),
@@ -484,10 +486,10 @@ void Cluster::checkDumpIn(Lock& l) {
if (state == LEFT) return;
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map);
mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
// Don't flush the mcast queue till we are READY, on self-deliver.
state = CATCHUP;
+ QPID_LOG(info, *this << " received dump, starting catch-up");
deliverQueue.start();
}
}
@@ -498,16 +500,16 @@ void Cluster::dumpOutDone() {
}
void Cluster::dumpOutDone(Lock& l) {
- QPID_LOG(debug, *this << " finished sending dump.");
assert(state == DUMPER);
state = READY;
+ QPID_LOG(info, *this << " sent dump");
deliverQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
}
void Cluster::dumpOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending state dump: " << e.what());
+ QPID_LOG(error, *this << " error sending dump: " << e.what());
dumpOutDone(l);
}
@@ -529,9 +531,9 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(Lock&) {
+void Cluster::stopClusterNode(Lock& l) {
QPID_LOG(notice, *this << " stopped by admin");
- leave();
+ leave(l);
}
void Cluster::stopFullCluster(Lock& l) {
@@ -541,27 +543,27 @@ void Cluster::stopFullCluster(Lock& l) {
void Cluster::memberUpdate(Lock& l) {
QPID_LOG(debug, *this << " member update, map=" << map);
- std::vector<Url> vectUrl = getUrls(l);
- size_t size = vectUrl.size();
-
- failoverExchange->setUrls(vectUrl);
+ std::vector<Url> urls = getUrls(l);
+ size_t size = urls.size();
+ failoverExchange->setUrls(urls);
+
+ if (size == 1 && lastSize > 1 && state >= READY) {
+ QPID_LOG(info, *this << " last broker standing, update queue policies");
+ lastBroker = true;
+ broker.getQueues().updateQueueClusterState(true);
+ }
+ else if (size > 1 && lastBroker) {
+ QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ lastBroker = false;
+ broker.getQueues().updateQueueClusterState(false);
+ }
+ lastSize = size;
if (mgmtObject) {
-
- if (lastSize != size && size == 1){
- QPID_LOG(info, *this << " last node standing, updating queue policies.");
- broker.getQueues().updateQueueClusterState(true);
- }
- else if (lastSize != size && size > 1) {
- QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size);
- broker.getQueues().updateQueueClusterState(false);
- }
- lastSize = size;
-
mgmtObject->set_clusterSize(size);
string urlstr;
- for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
- if (iter != vectUrl.begin()) urlstr += "\n";
+ for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
+ if (iter != urls.begin()) urlstr += "\n";
urlstr += iter->str();
}
mgmtObject->set_members(urlstr);