diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5a0b3a3d5e..e35d3e4175 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -190,6 +190,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), + quorum(boost::bind(&Cluster::leave, this)), initialized(false), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -214,7 +215,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. while (!initialized) @@ -226,10 +226,10 @@ Cluster::~Cluster() { } void Cluster::initialize() { + if (settings.quorum) quorum.start(poller); if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - QPID_LOG(notice, *this << " member " << self << " joining " - << name << " with url=" << myUrl); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -404,6 +404,7 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); Mutex::ScopedLock l(lock); + if (state == LEFT) return; EventFrame e(efConst); const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); if (offer && error.isUnresolved()) { @@ -510,7 +511,7 @@ void Cluster::configChange ( const cpg_name */*group*/, const cpg_address *current, int nCurrent, const cpg_address *left, int nLeft, - const cpg_address */*joined*/, int /*nJoined*/) + const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); if (state == INIT) { // First config change. @@ -518,8 +519,11 @@ void Cluster::configChange ( broker.setRecovery(nCurrent == 1); initialized = true; } - QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent) - << AddrList(left, nLeft, "left: ")); + QPID_LOG(notice, *this << " membership change: " + << AddrList(current, nCurrent) << "(" + << AddrList(joined, nJoined, "joined: ") + << AddrList(left, nLeft, "left: ") + << ")"); std::string addresses; for (const cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); @@ -833,9 +837,9 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); - o << "cluster:" << STATE[cluster.state]; + o << "cluster(" << cluster.self << " " << STATE[cluster.state]; if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error"; - return o; + return o << ")";; } MemberId Cluster::getId() const { @@ -846,14 +850,6 @@ broker::Broker& Cluster::getBroker() const { return broker; // Immutable, no need to lock. } -void Cluster::checkQuorum() { - if (!quorum.isQuorate()) { - QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down"); - leave(); - throw Exception(QPID_MSG(*this << " disconnected from cluster quorum.")); - } -} - void Cluster::setClusterId(const Uuid& uuid, Lock&) { clusterId = uuid; if (mgmtObject) { |
