summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp478
1 files changed, 333 insertions, 145 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index b48443526c..9c503d6d13 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -18,19 +18,24 @@
#include "Cluster.h"
#include "Connection.h"
+#include "DumpClient.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
-#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterDumpOfferBody.h"
+#include "qpid/framing/ClusterDumpStartBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
@@ -55,156 +60,221 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace qmf = qmf::org::apache::qpid::cluster;
+/**@file
+ Threading notes:
+ - Public functions may be called in local connection IO threads.
+ see .h.
+*/
+
+struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
+ qpid::cluster::Cluster& cluster;
+ MemberId member;
+ Cluster::Lock& l;
+ ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
+
+ void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); }
+ void ready(const std::string& url) { cluster.ready(member, url, l); }
+ void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); }
+ void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); }
+ void shutdown() { cluster.shutdown(member, l); }
+
+ bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
+};
+
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
poller(b.getPoller()),
cpg(*this),
name(name_),
- url(url_),
- self(cpg.self()),
- cpgDispatchHandle(cpg,
- boost::bind(&Cluster::dispatch, this, _1), // read
- 0, // write
- boost::bind(&Cluster::disconnect, this, _1) // disconnect
+ myUrl(url_),
+ memberId(cpg.self()),
+ cpgDispatchHandle(
+ cpg,
+ boost::bind(&Cluster::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+ deliverQueue(boost::bind(&Cluster::process, this, _1), poller),
+ mcastId(0),
mgmtObject(0),
- handler(&joiningHandler),
- joiningHandler(*this),
- memberHandler(*this),
- mcastId(),
- lastSize(1)
+ state(INIT),
+ lastSize(1)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str());
+ mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str());
agent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
-
// FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
- QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
+ QPID_LOG(notice, *this << " joining cluster " << name.str());
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
+}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
+ Lock l(lock);
+ assert(!c->isCatchUp());
connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
}
-void Cluster::dumpComplete() { handler->dumpComplete(); }
-
void Cluster::erase(ConnectionId id) {
- Mutex::ScopedLock l(lock);
+ Lock l(lock);
connections.erase(id);
}
-void Cluster::leave() {
- QPID_LOG(notice, self << " leaving cluster " << name.str());
- cpg.leave(name);
- // Defer shut down to the final configChange when the group knows we've left.
+void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+ Lock l(lock);
+ mcastControl(body, cptr, l);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
- AMQFrame f(body);
- Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId);
- Buffer buf(e);
- f.encode(buf);
- QPID_LOG(trace, "MCAST " << e << " " << body);
- mcastEvent(e);
+void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, Lock&) {
+ Lock l(lock);
+ Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId));
+ QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
+ mcast(e, l);
}
void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) {
+ Lock l(lock);
+ mcastBuffer(data, size, connection, id, l);
+}
+
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id, Lock&) {
+ Lock l(lock);
Event e(DATA, connection, size, id);
memcpy(e.getData(), data, size);
- QPID_LOG(trace, "MCAST " << e);
- mcastEvent(e);
+ QPID_LOG(trace, *this << " MCAST " << e);
+ mcast(e, l);
}
-void Cluster::mcastEvent(const Event& e) {
- e.mcast(name, cpg);
-}
+void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
-size_t Cluster::size() const {
- Mutex::ScopedLock l(lock);
- return map.size();
+void Cluster::mcast(const Event& e, Lock&) {
+ if (state == LEFT) return;
+ if (state < READY && e.isConnection()) {
+ // Stall outgoing connection events.
+ QPID_LOG(trace, *this << " MCAST deferred: " << e );
+ mcastQueue.push_back(e);
+ }
+ else
+ e.mcast(name, cpg);
}
std::vector<Url> Cluster::getUrls() const {
- Mutex::ScopedLock l(lock);
+ Lock l(lock);
+ return getUrls(l);
+}
+
+std::vector<Url> Cluster::getUrls(Lock&) const {
return map.memberUrls();
}
-// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
-// Check locking from Handler functions.
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
- Mutex::ScopedLock l(lock);
- if (id.getMember() == self)
- return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
- ConnectionMap::iterator i = connections.find(id);
+void Cluster::leave() {
+ Lock l(lock);
+ leave(l);
+}
+
+void Cluster::leave(Lock&) {
+ if (state != LEFT) {
+ state = LEFT;
+ QPID_LOG(notice, *this << " leaving cluster " << name.str());
+
+ if (!deliverQueue.isStopped()) deliverQueue.stop();
+ if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
+ try { cpg.leave(name); }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error leaving process group: " << e.what());
+ }
+ try { broker.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error during shutdown, aborting: " << e.what());
+ abort(); // Big trouble.
+ }
+ }
+}
+
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) {
+ if (connectionId.getMember() == memberId)
+ return boost::intrusive_ptr<Connection>(connectionId.getPointer());
+ ConnectionMap::iterator i = connections.find(connectionId);
if (i == connections.end()) { // New shadow connection.
- assert(id.getMember() != self);
+ assert(connectionId.getMember() != memberId);
std::ostringstream mgmtId;
- mgmtId << name.str() << ":" << id;
- ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
+ mgmtId << name.str() << ":" << connectionId;
+ ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
i = connections.insert(value).first;
}
return i->second;
}
-void Cluster::deliver(
+Cluster::Connections Cluster::getConnections(Lock&) {
+ Connections result(connections.size());
+ std::transform(connections.begin(), connections.end(), result.begin(),
+ boost::bind(&ConnectionMap::value_type::second, _1));
+ return result;
+}
+
+void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
uint32_t nodeid,
uint32_t pid,
void* msg,
- int msg_len)
+ int msg_len)
{
- try {
- MemberId from(nodeid, pid);
- Event e = Event::delivered(from, msg, msg_len);
+ Mutex::ScopedLock l(lock);
+ MemberId from(nodeid, pid);
+ Event e = Event::delivered(from, msg, msg_len);
+ if (state == LEFT) return;
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ if (e.isCluster() && state != DUMPEE) // Process cluster controls immediately unless in DUMPEE state.
+ process(e, l);
+ else if (state != NEWBIE) // Newbie discards events up to the dump offer.
+ deliverQueue.push(e);
+}
+
+void Cluster::process(const Event& e) {
+ Lock l(lock);
+ process(e,l);
+}
- // Process cluster controls immediately
- if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
- Buffer buf(e);
- AMQFrame frame;
+void Cluster::process(const Event& e, Lock& l) {
+ try {
+ Buffer buf(e);
+ AMQFrame frame;
+ if (e.isCluster()) {
while (frame.decode(buf)) {
- QPID_LOG(trace, "DLVR " << e << " " << frame);
- if (!handler->invoke(e.getConnectionId().getMember(), frame))
+ QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ ClusterDispatcher dispatch(*this, e.getMemberId(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
- else {
- QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e);
- handler->deliver(e);
+ else { // e.isConnection()
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (e.getType() == DATA) {
+ QPID_LOG(trace, *this << " PROC: " << e);
+ connection->deliverBuffer(buf);
+ }
+ else { // control
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ connection->delivered(frame);
+ }
+ }
}
}
catch (const std::exception& e) {
- QPID_LOG(critical, "Error in cluster deliver: " << e.what());
- leave();
- }
-}
-
-void Cluster::connectionEvent(const Event& e) {
- Buffer buf(e);
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
- assert(connection);
- if (e.getType() == DATA) {
- QPID_LOG(trace, "EXEC: " << e);
- connection->deliverBuffer(buf);
- }
- else { // control
- AMQFrame frame;
- while (frame.decode(buf)) {
- QPID_LOG(trace, "EXEC " << e << " " << frame);
- connection->delivered(frame);
- }
+ QPID_LOG(critical, *this << " error in cluster process: " << e.what());
+ leave(l);
}
}
@@ -236,16 +306,22 @@ ostream& operator<<(ostream& o, const AddrList& a) {
}
void Cluster::dispatch(sys::DispatchHandle& h) {
- cpg.dispatchAll();
- h.rewatch();
+ try {
+ cpg.dispatchAll();
+ h.rewatch();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster deliver: " << e.what());
+ leave();
+ }
}
void Cluster::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down");
+ QPID_LOG(critical, *this << " disconnected from cluster, shutting down");
broker.shutdown();
}
-void Cluster::configChange(
+void Cluster::configChange (
cpg_handle_t /*handle*/,
cpg_name */*group*/,
cpg_address *current, int nCurrent,
@@ -253,49 +329,57 @@ void Cluster::configChange(
cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent)
+ QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
-
- if (find(left, left+nLeft, self) != left+nLeft) {
- // I have left the group, this is the final config change.
- QPID_LOG(notice, self << " left cluster " << name.str());
- broker.shutdown();
- return;
+ map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+ updateMemberStats(l);
+ if (state == LEFT) return;
+ if (!map.isAlive(memberId)) { leave(l); return; }
+
+ if(state == INIT) { // First configChange
+ if (map.aliveCount() == 1) {
+ QPID_LOG(info, *this << " first in cluster at " << myUrl);
+ map = ClusterMap(memberId, myUrl, true);
+ unstall(l);
+ }
+ else { // Joining established group.
+ state = NEWBIE;
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), 0, l);
+ QPID_LOG(debug, *this << " send dump-request " << myUrl);
+ }
}
-
- if (map.left(left, nLeft)) updateMemberStats();
- handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
}
-
-broker::Broker& Cluster::getBroker(){ return broker; }
-
-void Cluster::stall() {
- Mutex::ScopedLock l(lock);
- QPID_LOG(debug, self << " stalling.");
- // Stop processing connection events. We still process config changes
- // and cluster controls in deliver()
- connectionEventQueue.stop();
- if (mgmtObject!=0) mgmtObject->set_status("STALLED");
-
- // FIXME aconway 2008-09-11: Flow control, we should slow down or
- // stop reading from local connections while stalled to avoid an
- // unbounded queue.
+void Cluster::dumpInDone(const ClusterMap& m) {
+ Lock l(lock);
+ dumpedMap = m;
+ checkDumpIn(l);
}
-void Cluster::ready() {
- // Called with lock held
- QPID_LOG(debug, self << " ready at " << url);
- unstall();
- mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
+ if (state == READY && map.isNewbie(id)) {
+ state = OFFER;
+ QPID_LOG(debug, *this << " send dump-offer to " << id);
+ mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l);
+ }
}
-void Cluster::unstall() {
+void Cluster::unstall(Lock& l) {
// Called with lock held
- QPID_LOG(debug, self << " un-stalling");
- handler = &memberHandler; // Member mode.
- connectionEventQueue.start(poller);
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ switch (state) {
+ case INIT: case DUMPEE: case DUMPER:
+ QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size()
+ << " mcast=" << mcastQueue.size());
+ deliverQueue.start();
+ state = READY;
+ for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ mcastQueue.clear();
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ break;
+ case LEFT: break;
+ case NEWBIE: case READY: case OFFER:
+ assert(0);
+ }
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -303,17 +387,106 @@ void Cluster::unstall() {
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
//
-void Cluster::brokerShutdown() {
- QPID_LOG(notice, self << " shutting down.");
- try { cpg.shutdown(); }
- catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
+void Cluster::brokerShutdown() {
+ QPID_LOG(notice, *this << " shutting down ");
+ if (state != LEFT) {
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, *this << " during shutdown: " << e.what());
+ }
+ }
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; }
+void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
+ map.dumpRequest(id, url);
+ tryMakeOffer(id, l);
+}
+
+void Cluster::ready(const MemberId& id, const std::string& url, Lock&) {
+ map.ready(id, Url(url));
+}
+
+void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
+ if (state == LEFT) return;
+ MemberId dumpee(dumpeeInt);
+ boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
+ if (dumper == memberId) {
+ assert(state == OFFER);
+ if (url) { // My offer was first.
+ QPID_LOG(debug, *this << " mark dump point for dump to " << dumpee);
+ // Put dump-start on my own deliver queue to mark the stall point.
+ // We will stall when it is processed.
+ deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), dumpee, url->str()), memberId));
+ }
+ else { // Another offer was first.
+ QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
+ state = READY;
+ tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
+ }
+ }
+ else if (dumpee == memberId && url) {
+ assert(state == NEWBIE);
+ QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
+ state = DUMPEE;
+ checkDumpIn(l);
+ }
+}
+
+void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) {
+ if (state == LEFT) return;
+ MemberId dumpee(dumpeeInt);
+ Url url(urlStr);
+ assert(state == OFFER);
+ deliverQueue.stop();
+ QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr);
+ state = DUMPER;
+ if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
+ dumpThread = Thread(
+ new DumpClient(memberId, dumpee, url, broker, map, getConnections(l),
+ boost::bind(&Cluster::dumpOutDone, this),
+ boost::bind(&Cluster::dumpOutError, this, _1)));
+}
+
+void Cluster::checkDumpIn(Lock& l) {
+ if (state == LEFT) return;
+ assert(state == DUMPEE || state == NEWBIE);
+ if (state == DUMPEE && dumpedMap) {
+ map = *dumpedMap;
+ QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
+ unstall(l);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
+ }
+}
+
+void Cluster::dumpOutDone() {
+ Monitor::ScopedLock l(lock);
+ dumpOutDone(l);
+}
+
+void Cluster::dumpOutDone(Lock& l) {
+ QPID_LOG(debug, *this << " finished sending dump.");
+ assert(state == DUMPER);
+ unstall(l);
+ tryMakeOffer(map.firstNewbie(), l); // Try another offer
+}
+
+void Cluster::dumpOutError(const std::exception& e) {
+ Monitor::ScopedLock l(lock);
+ QPID_LOG(error, *this << " error sending state dump: " << e.what());
+ dumpOutDone(l);
+}
+
+void Cluster ::shutdown(const MemberId& id, Lock& l) {
+ QPID_LOG(notice, *this << " received shutdown from " << id);
+ leave(l);
+}
+
+ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
- QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+ Lock l(lock);
+ QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
@@ -322,30 +495,32 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(void) {
- QPID_LOG(notice, self << " stopped by admin");
+void Cluster::stopClusterNode() {
+ QPID_LOG(notice, *this << " stopped by admin");
leave();
}
-void Cluster::stopFullCluster(void) {
- QPID_LOG(notice, self << " sending shutdown to cluster.");
- mcastControl(ClusterShutdownBody(), 0);
+void Cluster::stopFullCluster() {
+ Lock l(lock);
+ QPID_LOG(notice, *this << " shutting down cluster " << name.str());
+ mcastControl(ClusterShutdownBody(), 0, l);
}
-void Cluster::updateMemberStats() {
+void Cluster::updateMemberStats(Lock& l) {
if (mgmtObject) {
- if (lastSize != size() && size() ==1){
- QPID_LOG(info, "Last node standing, updating queue policies, size:" <<size());
- broker.getQueues().updateQueueClusterState(true);
- lastSize = size();
- }else if (lastSize != size() && size() > 1) {
- QPID_LOG(info, "Recover back from last node standing, updating queue policies, size:" <<size());
- broker.getQueues().updateQueueClusterState(false);
- lastSize = size();
- }
+ std::vector<Url> vectUrl = getUrls(l);
+ size_t size = vectUrl.size();
+ if (lastSize != size && size == 1){
+ QPID_LOG(info, *this << " last node standing, updating queue policies.");
+ broker.getQueues().updateQueueClusterState(true);
+ }
+ else if (lastSize != size && size > 1) {
+ QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size);
+ broker.getQueues().updateQueueClusterState(false);
+ }
+ lastSize = size;
- mgmtObject->set_clusterSize(size());
- std::vector<Url> vectUrl = getUrls();
+ mgmtObject->set_clusterSize(size);
string urlstr;
for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
if (iter != vectUrl.begin()) urlstr += "\n";
@@ -355,4 +530,17 @@ void Cluster::updateMemberStats() {
}
}
+std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
+ static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", "OFFER", "DUMPER", "LEFT" };
+ return o << cluster.memberId << "(" << STATE[cluster.state] << ")";
+}
+
+MemberId Cluster::getId() const {
+ return memberId; // Immutable, no need to lock.
+}
+
+broker::Broker& Cluster::getBroker() const {
+ return broker; // Immutable, no need to lock.
+}
+
}} // namespace qpid::cluster