diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 51 |
1 files changed, 39 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f93203acbf..4d54a837ca 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -38,6 +38,7 @@ #include <algorithm> #include <iterator> #include <map> +#include <ostream> namespace qpid { namespace cluster { @@ -67,11 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : ) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(trace, "Joining cluster: " << name << " as " << self); + QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); cpg.join(name); - mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), - ConnectionId(self,0)); - // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); } @@ -94,7 +92,7 @@ void Cluster::leave() { // Leave is called by from Broker destructor after the poller has // been shut down. No dispatches can occur. - QPID_LOG(debug, "Leaving cluster " << name.str()); + QPID_LOG(notice, "Leaving cluster " << name.str()); cpg.leave(name); // broker= is set to 0 when the final config-change is delivered. while(broker) { @@ -158,7 +156,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) if (i == connections.end()) { // New shadow connection. assert(id.getMember() != self); std::ostringstream mgmtId; - mgmtId << name << ":" << id; + mgmtId << name.str() << ":" << id; ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id)); i = connections.insert(value).first; } @@ -205,22 +203,50 @@ void Cluster::deliver( } } +struct AddrList { + const cpg_address* addrs; + int count; + AddrList(const cpg_address* a, int n) : addrs(a), count(n) {} +}; + +ostream& operator<<(ostream& o, const AddrList& a) { + for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { + const char* reasonString; + switch (p->reason) { + case CPG_REASON_JOIN: reasonString = " joined "; break; + case CPG_REASON_LEAVE: reasonString = " left ";break; + case CPG_REASON_NODEDOWN: reasonString = " node-down ";break; + case CPG_REASON_NODEUP: reasonString = " node-up ";break; + case CPG_REASON_PROCDOWN: reasonString = " process-down ";break; + default: reasonString = " "; + } + qpid::cluster::MemberId member(*p); + o << member << reasonString; + } + return o; +} + void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) + cpg_address *joined, int nJoined) { - QPID_LOG(debug, "Cluster change: " - << std::make_pair(current, nCurrent) - << std::make_pair(left, nLeft)); + QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " + << AddrList(joined, nJoined) << AddrList(left, nLeft)); + + if (nJoined) // Notfiy new members of my URL. + mcastFrame( + AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), + ConnectionId(self,0)); + Mutex::ScopedLock l(lock); for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); // Add new members when their URL notice arraives. - if (std::find(left, left+nLeft, self) != left+nLeft) + if (find(left, left+nLeft, self) != left+nLeft) broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } @@ -236,7 +262,8 @@ void Cluster::disconnect(sys::DispatchHandle& h) { broker->shutdown(); } -void Cluster::urlNotice(const MemberId& m, const std::string& url) { +void Cluster::urlNotice(const MemberId& m, const string& url) { + QPID_LOG(notice, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } |
