diff options
| author | Alan Conway <aconway@apache.org> | 2009-02-27 19:34:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-02-27 19:34:47 +0000 |
| commit | 7c79adf16acfeb31cd2b90699c456698237a2e82 (patch) | |
| tree | 743738de6f0d5448ebc8280d45976511de8c9ad4 /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | 739e6e1c66341b6b8dbda6776ada8179765ed347 (diff) | |
| download | qpid-python-7c79adf16acfeb31cd2b90699c456698237a2e82.tar.gz | |
cluster: apply membership updates while in CATCHUP mode.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 17 |
1 files changed, 8 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6b4cd0256c..312d1e90e3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -234,9 +234,9 @@ void Cluster::deliveredEvent(const Event& e) { // Check for deliver close here so we can erase the // connection decoder safely in this thread. if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>()) - decoder.erase(e.getConnectionId()); + decoder.erase(e.getConnectionId()); deliverFrameQueue.push(EventFrame(e, frame)); - } + } } else if (e.getType() == DATA) decoder.decode(e, e.getData()); @@ -345,7 +345,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& broker.getLinks().setPassive(true); } } - else if (state >= READY && memberChange) { + else if (state >= CATCHUP && memberChange) { memberUpdate(l); elders = ClusterMap::intersection(elders, map.getAlive()); if (elders.empty()) { @@ -357,7 +357,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& bool Cluster::isLeader() const { return elders.empty(); } -void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { +void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); @@ -382,7 +382,7 @@ void Cluster::brokerShutdown() { void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { map.updateRequest(id, url); - tryMakeOffer(id, l); + makeOffer(id, l); } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { @@ -406,7 +406,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu else { // Another offer was first. setReady(l); QPID_LOG(info, *this << " cancelled update offer to " << updatee); - tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. + makeOffer(map.firstJoiner(), l); // Maybe make another offer. } } else if (updatee == myId && url) { @@ -446,7 +446,6 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { } void Cluster::checkUpdateIn(Lock& ) { - if (state == LEFT) return; if (state == UPDATEE && updatedMap) { map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); @@ -467,7 +466,7 @@ void Cluster::updateOutDone(Lock& l) { state = READY; mcast.release(); deliverFrameQueue.start(); - tryMakeOffer(map.firstJoiner(), l); // Try another offer + makeOffer(map.firstJoiner(), l); // Try another offer } void Cluster::updateOutError(const std::exception& e) { @@ -522,7 +521,7 @@ void Cluster::memberUpdate(Lock& l) { size_t size = urls.size(); failoverExchange->setUrls(urls); - if (size == 1 && lastSize > 1 && state >= READY) { + if (size == 1 && lastSize > 1 && state >= CATCHUP) { QPID_LOG(info, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); |
