summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp37
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp11
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.cpp1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp3
5 files changed, 28 insertions, 28 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3d46797679..c5ac2ecfdc 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -228,7 +228,8 @@ Cluster::~Cluster() {
void Cluster::initialize() {
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " member " << self << " joining "
+ << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -240,7 +241,7 @@ void Cluster::initialize() {
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, *this << " add local connection " << c->getId());
+ QPID_LOG(info, *this << " new local connection " << c->getId());
localConnections.insert(c);
assert(c->getId().getMember() == self);
// Announce the connection to the cluster.
@@ -250,7 +251,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, *this << " add shadow connection " << c->getId());
+ QPID_LOG(info, *this << " new shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, either
// discarding or stalled, so deliveredFrame is not processing any
// connection events.
@@ -267,7 +268,7 @@ void Cluster::erase(const ConnectionId& id) {
// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id, Lock&) {
- QPID_LOG(debug, *this << " erasing connection " << id);
+ QPID_LOG(info, *this << " connection closed " << id);
connections.erase(id);
decoder.erase(id);
}
@@ -357,7 +358,7 @@ void Cluster::deliveredEvent(const Event& e) {
// This preserves the connection decoder fragments for an update.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer) {
- QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId()
+ QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId()
<< " to " << MemberId(offer->getUpdatee()));
deliverEventQueue.stop();
}
@@ -440,7 +441,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
connection->deliveredFrame(e);
}
else
- QPID_LOG(debug, *this << " DROP (no connection): " << e);
+ QPID_LOG(trace, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
QPID_LOG(trace, *this << " DROP (joining): " << e);
@@ -517,7 +518,7 @@ void Cluster::configChange (
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+ QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "left: "));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
@@ -553,7 +554,6 @@ void Cluster::configChange(const MemberId&, const std::string& current, Lock& l)
}
else { // Joining established group.
state = JOINER;
- QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
elders = map.getAlive();
elders.erase(self);
@@ -603,7 +603,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
memberUpdate(l);
if (state == CATCHUP && id == self) {
setReady(l);
- QPID_LOG(notice, *this << " caught up, active cluster member");
+ QPID_LOG(notice, *this << " caught up, active cluster member.");
}
}
@@ -635,7 +635,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
assert(state == JOINER);
setClusterId(uuid, l);
state = UPDATEE;
- QPID_LOG(info, *this << " receiving update from " << updater);
+ QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
else {
@@ -662,7 +662,6 @@ void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l
if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
- QPID_LOG(info, *this << " retracting offer to " << updatee);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
@@ -679,7 +678,7 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
- QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
+ QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(
@@ -711,13 +710,13 @@ void Cluster::checkUpdateIn(Lock&) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
- QPID_LOG(info, *this << " received update, starting catch-up");
+ QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map);
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
state = JOINER;
- QPID_LOG(info, *this << " re-try joining after retracted update");
+ QPID_LOG(notice, *this << " update retracted, sending new update request");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
deliverEventQueue.start();
}
@@ -729,7 +728,7 @@ void Cluster::updateOutDone() {
}
void Cluster::updateOutDone(Lock& l) {
- QPID_LOG(info, *this << " sent update");
+ QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
mcast.release();
@@ -834,9 +833,9 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
"INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
- o << cluster.self << "(" << STATE[cluster.state];
+ o << "cluster:" << STATE[cluster.state];
if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
- return o << ")";
+ return o;
}
MemberId Cluster::getId() const {
@@ -863,7 +862,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
- QPID_LOG(debug, *this << " cluster-id = " << clusterId);
+ QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
}
void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
@@ -875,7 +874,7 @@ void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&
// ErrorCheck class) then we have processed succesfully past the
// point of the error.
if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
- QPID_LOG(debug, *this << " error " << frameSeq << " did not occur locally.");
+ QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
}
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index a898cb5059..03a06e1c09 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -439,13 +439,13 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
void Connection::exchange(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
- QPID_LOG(debug, cluster << " decoded exchange " << ex->getName());
+ QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
}
void Connection::queue(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
- QPID_LOG(debug, cluster << " decoded queue " << q->getName());
+ QPID_LOG(debug, cluster << " updated queue " << q->getName());
}
void Connection::sessionError(uint16_t , const std::string& msg) {
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
index d498b252f5..0a16d492e4 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -57,7 +57,7 @@ void ErrorCheck::error(
QPID_LOG(error, cluster
<< (type == ERROR_TYPE_SESSION ? " channel" : " connection")
<< " error " << frameSeq << " on " << c << ": " << msg
- << " (unresolved: " << unresolved << ")");
+ << " must be resolved with: " << unresolved);
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
// If there are already frames queued up by a previous error, review
@@ -87,8 +87,8 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
throw Exception("Aborted by failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
- QPID_LOG(debug, cluster << " error " << frameSeq
- << " outcome agrees with " << i->getMemberId());
+ QPID_LOG(notice, cluster << " error " << frameSeq
+ << " resolved with " << i->getMemberId());
unresolved.erase(i->getMemberId());
checkResolved();
}
@@ -117,10 +117,11 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
void ErrorCheck::checkResolved() {
if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
type = ERROR_TYPE_NONE;
- QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+ QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
}
else
- QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+ QPID_LOG(notice, cluster << " error " << frameSeq
+ << " must be resolved with " << unresolved);
}
EventFrame ErrorCheck::getNext() {
diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp
index 0bf29f8042..32ed5c1d91 100644
--- a/cpp/src/qpid/cluster/Quorum_cman.cpp
+++ b/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -31,7 +31,6 @@ Quorum::Quorum() : enable(false), cman(0) {}
Quorum::~Quorum() { if (cman) cman_finish(cman); }
void Quorum::init() {
- QPID_LOG(info, "Waiting for cluster quorum");
enable = true;
cman = cman_init(0);
if (cman == 0) throw ErrnoException("Can't connect to cman service");
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 143db20ac0..ac418ffbb6 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -124,7 +124,8 @@ void UpdateClient::run() {
}
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
+ QPID_LOG(debug, updaterId << " updating state to " << updateeId
+ << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));