summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp28
1 files changed, 12 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5a0b3a3d5e..e35d3e4175 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -190,6 +190,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
+ quorum(boost::bind(&Cluster::leave, this)),
initialized(false),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -214,7 +215,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
- if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
while (!initialized)
@@ -226,10 +226,10 @@ Cluster::~Cluster() {
}
void Cluster::initialize() {
+ if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " member " << self << " joining "
- << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -404,6 +404,7 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
+ if (state == LEFT) return;
EventFrame e(efConst);
const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
if (offer && error.isUnresolved()) {
@@ -510,7 +511,7 @@ void Cluster::configChange (
const cpg_name */*group*/,
const cpg_address *current, int nCurrent,
const cpg_address *left, int nLeft,
- const cpg_address */*joined*/, int /*nJoined*/)
+ const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
if (state == INIT) { // First config change.
@@ -518,8 +519,11 @@ void Cluster::configChange (
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
- << AddrList(left, nLeft, "left: "));
+ QPID_LOG(notice, *this << " membership change: "
+ << AddrList(current, nCurrent) << "("
+ << AddrList(joined, nJoined, "joined: ")
+ << AddrList(left, nLeft, "left: ")
+ << ")");
std::string addresses;
for (const cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
@@ -833,9 +837,9 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
"INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
- o << "cluster:" << STATE[cluster.state];
+ o << "cluster(" << cluster.self << " " << STATE[cluster.state];
if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
- return o;
+ return o << ")";;
}
MemberId Cluster::getId() const {
@@ -846,14 +850,6 @@ broker::Broker& Cluster::getBroker() const {
return broker; // Immutable, no need to lock.
}
-void Cluster::checkQuorum() {
- if (!quorum.isQuorate()) {
- QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
- leave();
- throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
- }
-}
-
void Cluster::setClusterId(const Uuid& uuid, Lock&) {
clusterId = uuid;
if (mgmtObject) {