summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp51
1 files changed, 39 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f93203acbf..4d54a837ca 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -38,6 +38,7 @@
#include <algorithm>
#include <iterator>
#include <map>
+#include <ostream>
namespace qpid {
namespace cluster {
@@ -67,11 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
)
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(trace, "Joining cluster: " << name << " as " << self);
+ QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
cpg.join(name);
- mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
- ConnectionId(self,0));
-
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
}
@@ -94,7 +92,7 @@ void Cluster::leave() {
// Leave is called by from Broker destructor after the poller has
// been shut down. No dispatches can occur.
- QPID_LOG(debug, "Leaving cluster " << name.str());
+ QPID_LOG(notice, "Leaving cluster " << name.str());
cpg.leave(name);
// broker= is set to 0 when the final config-change is delivered.
while(broker) {
@@ -158,7 +156,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id)
if (i == connections.end()) { // New shadow connection.
assert(id.getMember() != self);
std::ostringstream mgmtId;
- mgmtId << name << ":" << id;
+ mgmtId << name.str() << ":" << id;
ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
i = connections.insert(value).first;
}
@@ -205,22 +203,50 @@ void Cluster::deliver(
}
}
+struct AddrList {
+ const cpg_address* addrs;
+ int count;
+ AddrList(const cpg_address* a, int n) : addrs(a), count(n) {}
+};
+
+ostream& operator<<(ostream& o, const AddrList& a) {
+ for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
+ const char* reasonString;
+ switch (p->reason) {
+ case CPG_REASON_JOIN: reasonString = " joined "; break;
+ case CPG_REASON_LEAVE: reasonString = " left ";break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down ";break;
+ case CPG_REASON_NODEUP: reasonString = " node-up ";break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down ";break;
+ default: reasonString = " ";
+ }
+ qpid::cluster::MemberId member(*p);
+ o << member << reasonString;
+ }
+ return o;
+}
+
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
+ cpg_address *joined, int nJoined)
{
- QPID_LOG(debug, "Cluster change: "
- << std::make_pair(current, nCurrent)
- << std::make_pair(left, nLeft));
+ QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
+ << AddrList(joined, nJoined) << AddrList(left, nLeft));
+
+ if (nJoined) // Notfiy new members of my URL.
+ mcastFrame(
+ AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
+ ConnectionId(self,0));
+
Mutex::ScopedLock l(lock);
for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
// Add new members when their URL notice arraives.
- if (std::find(left, left+nLeft, self) != left+nLeft)
+ if (find(left, left+nLeft, self) != left+nLeft)
broker = 0; // We have left the group, this is the final config change.
lock.notifyAll(); // Threads waiting for membership changes.
}
@@ -236,7 +262,8 @@ void Cluster::disconnect(sys::DispatchHandle& h) {
broker->shutdown();
}
-void Cluster::urlNotice(const MemberId& m, const std::string& url) {
+void Cluster::urlNotice(const MemberId& m, const string& url) {
+ QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}