summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp37
1 files changed, 18 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3d46797679..c5ac2ecfdc 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -228,7 +228,8 @@ Cluster::~Cluster() {
void Cluster::initialize() {
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " member " << self << " joining "
+ << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -240,7 +241,7 @@ void Cluster::initialize() {
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, *this << " add local connection " << c->getId());
+ QPID_LOG(info, *this << " new local connection " << c->getId());
localConnections.insert(c);
assert(c->getId().getMember() == self);
// Announce the connection to the cluster.
@@ -250,7 +251,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, *this << " add shadow connection " << c->getId());
+ QPID_LOG(info, *this << " new shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, either
// discarding or stalled, so deliveredFrame is not processing any
// connection events.
@@ -267,7 +268,7 @@ void Cluster::erase(const ConnectionId& id) {
// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id, Lock&) {
- QPID_LOG(debug, *this << " erasing connection " << id);
+ QPID_LOG(info, *this << " connection closed " << id);
connections.erase(id);
decoder.erase(id);
}
@@ -357,7 +358,7 @@ void Cluster::deliveredEvent(const Event& e) {
// This preserves the connection decoder fragments for an update.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer) {
- QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId()
+ QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId()
<< " to " << MemberId(offer->getUpdatee()));
deliverEventQueue.stop();
}
@@ -440,7 +441,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
connection->deliveredFrame(e);
}
else
- QPID_LOG(debug, *this << " DROP (no connection): " << e);
+ QPID_LOG(trace, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
QPID_LOG(trace, *this << " DROP (joining): " << e);
@@ -517,7 +518,7 @@ void Cluster::configChange (
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+ QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "left: "));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
@@ -553,7 +554,6 @@ void Cluster::configChange(const MemberId&, const std::string& current, Lock& l)
}
else { // Joining established group.
state = JOINER;
- QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
elders = map.getAlive();
elders.erase(self);
@@ -603,7 +603,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
memberUpdate(l);
if (state == CATCHUP && id == self) {
setReady(l);
- QPID_LOG(notice, *this << " caught up, active cluster member");
+ QPID_LOG(notice, *this << " caught up, active cluster member.");
}
}
@@ -635,7 +635,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
assert(state == JOINER);
setClusterId(uuid, l);
state = UPDATEE;
- QPID_LOG(info, *this << " receiving update from " << updater);
+ QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
else {
@@ -662,7 +662,6 @@ void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l
if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
- QPID_LOG(info, *this << " retracting offer to " << updatee);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
@@ -679,7 +678,7 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
- QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
+ QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(
@@ -711,13 +710,13 @@ void Cluster::checkUpdateIn(Lock&) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
- QPID_LOG(info, *this << " received update, starting catch-up");
+ QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map);
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
state = JOINER;
- QPID_LOG(info, *this << " re-try joining after retracted update");
+ QPID_LOG(notice, *this << " update retracted, sending new update request");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
deliverEventQueue.start();
}
@@ -729,7 +728,7 @@ void Cluster::updateOutDone() {
}
void Cluster::updateOutDone(Lock& l) {
- QPID_LOG(info, *this << " sent update");
+ QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
mcast.release();
@@ -834,9 +833,9 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
"INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
- o << cluster.self << "(" << STATE[cluster.state];
+ o << "cluster:" << STATE[cluster.state];
if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
- return o << ")";
+ return o;
}
MemberId Cluster::getId() const {
@@ -863,7 +862,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
- QPID_LOG(debug, *this << " cluster-id = " << clusterId);
+ QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
}
void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
@@ -875,7 +874,7 @@ void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&
// ErrorCheck class) then we have processed succesfully past the
// point of the error.
if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
- QPID_LOG(debug, *this << " error " << frameSeq << " did not occur locally.");
+ QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
}